package mqtt.receiver;

import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.slf4j.Slf4j;
import mqtt.annotation.Topic;
import mqtt.utils.StringUtils;
import mqtt.utils.Task;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;

import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

/**
 * @Author: Lamb
 * @Date: 2022/9/23 16:04
 */
@Slf4j
public class MqttReceiverMessageHandler implements MessageHandler {

    // key-value -> topic-Receiver
    private Map<String, Receiver> receiverMap;


    public void handleMessage(MqttMessage mqttMessage) throws MessagingException {
        log.info("---- -> mqttMessage" + mqttMessage);

        String originTopic = mqttMessage.getTopic();
        if (originTopic == null || mqttMessage.getPayload() == null) {
            return;
        }

        // 设备上下线监听，系统主题$SYS
        else if (originTopic.startsWith("$SYS")) {
            String res = originTopic.replace("$", "");
            if (originTopic.contains("|")) {
                int from = originTopic.indexOf(String.valueOf("|"));
                int end = originTopic.lastIndexOf(String.valueOf("|"));
                res = originTopic.substring(1, from) + originTopic.substring(end + 1, originTopic.length());
            }
            originTopic = res;
        }
        final String topic = originTopic;
        mqttMessage.setTopic(topic);
        // 只有获取到锁的节点才执行
//        boolean isLock = redisDistributedLock.lock(getLockKey(mqttMessage));
//        if (!isLock) {
//            log.warn("##################未获取到锁################## {}", mqttMessage);
//            return;
//        }

        ThreadUtil.execute(new Task() {
            @Override
            public void run() {
                try {
                    Receiver receiver = null;
                    for (String key : receiverMap.keySet()) {
                        String tmp = key.replace("$", "");
                        if (tmp.contains("+")) {
                            //String pattern = "^\\/sys\\/(\\w+\\/?)\\/(\\w+\\/?)";
                            String topicForRegex = tmp.replaceAll("/", "\\\\/")
                                    .replaceAll("\\+", "[^\\\\/]+");
                            boolean isMatch = Pattern.matches(topicForRegex, topic);
                            //log.debug("key: " + key + "----- topic: " + topic + "====" + isMatch);
                            if (isMatch) {
                                receiver = receiverMap.get(key);
                                break;
                            }
                        } else if (tmp.contains("#")) {
                            tmp.replace("/#", "");
                            if (topic.contains(tmp)) {
                                receiver = receiverMap.get(key);
                                break;
                            }
                        } else {
                            receiver = receiverMap.get(topic);
                        }
                    }
                    if (receiver != null) {
                        receiver.handleMessage(mqttMessage);
                    }
                } catch (Exception e) {
                    log.error("消息处理异常", e);
                }
            }
        });
    }

    private String getLockKey(MqttMessage mqttMessage) {
        StringBuilder lockKeyBuilder = new StringBuilder();
        lockKeyBuilder.append(mqttMessage.getTopic());
        lockKeyBuilder.append("_");
        lockKeyBuilder.append(mqttMessage.getQos());
        lockKeyBuilder.append("_");
        lockKeyBuilder.append(mqttMessage.getPayload());
        try {
            return StringUtils.md5(lockKeyBuilder.toString());
        } catch (NoSuchAlgorithmException e) {
            log.error("", e);
            return mqttMessage.getId() + "_" + mqttMessage.getTopic() + "_" + mqttMessage.getQos() + "_" + mqttMessage.getPayload().hashCode();
        }
    }

    /**
     * 将IOC容器中的Receiver所有实例注入
     *
     * @param receiverBeanMap
     */
    public void setReceiverBeanMap(Map<String, Receiver> receiverBeanMap) {
        receiverMap = new HashMap<>();
        Receiver receiver = null;
        Topic topic = null;
        for (Map.Entry<String, Receiver> entry : receiverBeanMap.entrySet()) {
            receiver = entry.getValue();
            if (receiver.getClass().isAnnotationPresent(Topic.class)) {
                topic = receiver.getClass().getAnnotation(Topic.class);
                receiverMap.put(topic.name(), receiver);
            }
        }
        log.info("receiverMap --- -> " + receiverMap);
    }

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
//        log.debug("--- -> message：" + message);
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setPayload((String) (message.getPayload()));
        MessageHeaders messageHeaders = message.getHeaders();
        if (messageHeaders != null) {
            mqttMessage.setId(messageHeaders.get("id").toString());
            mqttMessage.setTopic((String) messageHeaders.get("mqtt_receivedTopic"));
            mqttMessage.setQos(parseQos(messageHeaders.get("mqtt_receivedQos")));
            mqttMessage.setRetained(parseBool(messageHeaders.get("mqtt_receivedRetained")));
            mqttMessage.setDuplicate(parseBool(messageHeaders.get("mqtt_duplicate")));
            mqttMessage.setTimestamp(parseLong(messageHeaders.get("timestamp")));
        }
        handleMessage(mqttMessage);
    }

    private long parseLong(Object obj) {
        if (obj == null) {
            return 0L;
        }

        Long l = 0L;
        try {
            l = Long.parseLong(obj.toString());
        } catch (Exception e) {
            l = 0L;
        }
        return l;
    }

    private boolean parseBool(Object obj) {
        if (obj == null) {
            return false;
        }

        Boolean b = false;
        try {
            b = Boolean.valueOf(obj.toString());
        } catch (Exception e) {
            b = false;
        }
        return b;
    }

    private int parseQos(Object obj) {
        if (obj == null) {
            return -1;
        }

        Integer qos = -1;
        try {
            qos = Integer.parseInt(obj.toString());
        } catch (Exception e) {
            qos = -1;
        }
        return qos;
    }

}
